[アップデート]Step Functionsで動的並列処理がサポートされました!
Step Functionsにアップデートがあり、動的並列処理がサポートされました!
これにより、前段のステート等で動的に生成した配列に対して、繰り返し処理、おまけに並列処理を行う事が可能になりました。
早速試してみたいと思います。
やってみるための準備
ステートマシンから呼び出す、シンプルなLambda関数を作成します。
ここでは、CloudWatch Logsに存在するロググループを取得して、 取得したロググループそれぞれに対して、繰り返し処理をしてみたいと思います。
Lambda関数作成
DescribeLogGroups関数
CloudWatch Logsから情報を取得するだけのシンプルな関数です。 取得結果をすべて呼び出し元(ここではステートマシン)に返しています。
import boto3 logs_client = boto3.client('logs') def lambda_handler(event, context): #CloudWatch Logs情報取得 response = logs_client.describe_log_groups() return response
PrintLogGroups関数
入力で受け取った値を書き出すだけのシンプルな関数です。
import boto3 logs_client = boto3.client('logs') def lambda_handler(event, context): log_group_name = event['logGroupName'] print(log_group_name) return log_group_name
ステートマシン
ステートマシンのワークフローは以下となります。
繰り返し並列処理するぞ感が可視化されていい感じです。
ステートマシンの定義は以下となります。
{ "StartAt": "DescribeLogGroupsState", "States": { "DescribeLogGroupsState": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:DescribeLogGroups", "ResultPath": "$.describe_log_groups", "Next": "PrintLogGroupState" }, "PrintLogGroupState": { "Type": "Map", "InputPath": "$.describe_log_groups", "ItemsPath": "$.logGroups", "MaxConcurrency": 1, "Iterator": { "StartAt": "IteratorState", "States": { "IteratorState": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:PrintLogGroups", "End": true } } }, "End": true } } }
定義について説明します。
DescribeLogGroupsStateステート
このステートが実行されると、Lambda関数(DescribeLogGroups)が実行されます。Lambda関数で戻した値がResultPath
で指定した"describe_log_groups"に格納されます。ここでは、CloudWatch Logsのdescribe結果を戻しているので、以下のような値になります。
{ "describe_log_groups": { "logGroups": [ { "logGroupName": "/aws/lambda/DescribeExportTask", "creationTime": 1568873974706, "metricFilterCount": 0, "arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/lambda/DescribeExportTask:*", "storedBytes": 0 }, (省略) { "logGroupName": "/aws/sagemaker/TrainingJobs", "creationTime": 1567947081632, "metricFilterCount": 0, "arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/sagemaker/TrainingJobs:*", "storedBytes": 11247 } ], "ResponseMetadata": { "RequestId": "2c574e1f-d270-4587-965e-8b7b78ccdfeb", "HTTPStatusCode": 200, "HTTPHeaders": { "x-amzn-requestid": "2c574e1f-d270-4587-965e-8b7b78ccdfeb", "content-type": "application/x-amz-json-1.1", "content-length": "1445", "date": "Thu, 19 Sep 2019 08:24:35 GMT" }, "RetryAttempts": 0 } } }
PrintLogGroupStateステート
新たに追加されたステートTypeMap
を利用しています。
InputPath
に"describe_log_groups"を指定していますので、渡された入力のうち、"describe_log_groups"のみ、このステートで使用します。
ItemsPath
には、入力の配列を指定します。ここでは、"$.logGroups"を指定していますので、以下のような値が実行時の入力に渡されます。
{ "logGroupName": "/aws/lambda/DescribeExportTask", "creationTime": 1568873974706, "metricFilterCount": 0, "arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/lambda/DescribeExportTask:*", "storedBytes": 0 }, (省略) { "logGroupName": "/aws/sagemaker/TrainingJobs", "creationTime": 1567947081632, "metricFilterCount": 0, "arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/sagemaker/TrainingJobs:*", "storedBytes": 11247 }
渡された配列の要素数分、Iterator
で指定したステートが実行されますので、ここでは"IteratorState"ステートが繰り返し実行されます。
MaxConcurrency
は、並列処理数の制限です。デフォルト値は"0"となります。これは、並列処理に制限を設けず、可能な限り同時にステートが呼び出しされます。ここでは、"1"を指定しているので、前の処理が完了するまで新しい繰り返し処理は開始されません。
Mapの詳細については、以下をご確認ください。
やってみた
上記定義を利用した実行結果になります。
インデックスを選択でき、処理を行った際の入出力を確認することができました。すごくわかりやすいです。
PrintLogGroups関数のログをみてみたいと思います。入力で渡された配列数分繰り返しLambda関数が起動されていることが確認できました。
ステートマシン定義内のMaxConcurrency
を"0"にすると、並列実行されていることも確認できました。
さいごに
これまで、Step Functionsで動的に繰り返し処理をするとなると、インデックス等をどこかで実装する必要がありました。先日、以下のブログを書いた際に、もっと楽に繰り返し処理ができれば..と感じていたところです。
今回のアップデートにより、ステートマシンの定義や、Lambda関数がこれまで以上にシンプルにでき、ワークフローの実装が捗りそうですね!!